package top.jiangqiang.examples.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

/**
 * 同步订阅
 */
@Slf4j
public class SimpleConsumerExample {

    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);

        String endpoints = "localhost:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();
        String consumerGroup = "yourConsumerGroup";
        //每隔一定时间获取一次消息
        Duration awaitDuration = Duration.ofSeconds(3);
        String tag = "yourMessageTagA";
//        String topic = "DelayTopic"; //延时Topic
//        String topic = "TestTopic";
//        String topic = "TransactionTopic";
        String topic = "FifoTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setAwaitDuration(awaitDuration)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        //每次最多接收的消息条数
        int maxMessageNum = 10;
        //指定时间内这些消息对其他消费者不可见，如果此消费者没有确认消费消息，该时间之后，其他消费者可以继续获取这些消息
        Duration invisibleDuration = Duration.ofSeconds(15);
        do {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            log.info("Received {} message(s)", messages.size());
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    //确认消费消息，如果不确认的话，后面获取消息时会重新获取到该消息。
                    consumer.ack(message);
                    log.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        } while (true);
//         consumer.close();
    }
}
